Fork me on GitHub

Datax 自定义函数 dx_groovy

DataX 是阿里云开源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

Datax 的数据转换支持 UserDefined Function

官方的使用说明如下:

dx_groovy

  • 参数。
    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage
  • 备注:
    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
  • 举例:
    1
    2
    3
    4
    5
    6
    7
    groovy 实现的subStr:
    String code = "Column column = record.getColumn(1);\n" +
    " String oriValue = column.asString();\n" +
    " String newValue = oriValue.substring(0, 3);\n" +
    " record.setColumn(1, new StringColumn(newValue));\n" +
    " return record;";
    dx_groovy(record);
1
2
3
4
5
6
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String padString = \"12345\";\n" +
" String finalPad = \"\";\n" +
" int NeedLength = 8 - oriValue.length();\n" +
" while (NeedLength > 0) {\n" +
"\n" +
" if (NeedLength >= padString.length()) {\n" +
" finalPad += padString;\n" +
" NeedLength -= padString.length();\n" +
" } else {\n" +
" finalPad += padString.substring(0, NeedLength);\n" +
" NeedLength = 0;\n" +
" }\n" +
" }\n" +
" String newValue= finalPad + oriValue;\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
  • 以下是我自己实现的字符串替换 dx_groovy 函数
    {
    “job”: {
    "content": [                                      
        {                                             
            "reader": {                               
                "name": "mysqlreader", 
                "parameter": {
                    "column": ["user_role_id","username","ROLE"], 
                    "connection": [
                        {
                            "jdbcUrl": ["jdbc:mysql://10.1.18.155:3306/saiku_test"], 
                            "table": ["user_roles_copy"]
                        }
                    ], 
                    "password": "123456", 
                    "username": "root", 
                    "where": ""            
                }                                     
            },                                        
            "writer": {                               
                "name": "mysqlwriter",                
                "parameter": {                        
                    "column": ["user_role_id","username","ROLE"],
                    "connection": [                   
                        {                             
                            "jdbcUrl": "jdbc:mysql://10.1.18.155:3306/saiku_test",            
                            "table": ["user_roles_trans"]               
                        }                             
                    ],                                
                    "password": "123456", 
                    "preSql": [],                     
                    "session": [],                    
                    "username": "root", 
                    "writeMode": "insert"                   
                }                                     
            },
            "transformer": [
                {
                    "name": "dx_filter",
                    "parameter": 
                        {
                        "columnIndex":1,
                        "paras":["=","null"]
                        }  
                },{
                    "name": "dx_groovy",
                      "parameter": 
                        {
                           "code": "Column column = record.getColumn(2);\nString oriValue = column.asString();\nString sourceString = \"ROLE_ADMIN\";\nString changeString = \"replaceTest\";\nif (oriValue.equals(sourceString)){record.setColumn(2, new StringColumn(changeString));\nreturn record;}\nreturn record;",  
                           "extraPackage":[]                   
                        }  
                }
                ]                                       
        }                                             
    ],                                                
    "setting": {                                      
        "speed": {                                    
            "channel": "1"                             
        }                                             
    }                                                 
    
    }
    }
------------- The endThanks for reading-------------